package TestSparkStreaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * 这个算子和前面学过的reduceByKey算子的作用是差不多的，
 * 区别在于前者计算的当前窗口接收到的数据，后者计算的是整个键值对RDD的数据。
 * 下图演示了“每隔3秒输出最近6秒接收的每个单词数量”：
 */
public class TestReduceByKeyAndWindow {
    public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf().setAppName("TestReduceByKeyAndWindow").setMaster("local[2]");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(javaSparkContext, new Duration(1000));

        JavaReceiverInputDStream<String> javaReceiverInputDStream = javaStreamingContext.socketTextStream("localhost", 9999);

        JavaPairDStream<String, Integer> reduceByKeyAndWindow = javaReceiverInputDStream.flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }, new Duration(1000 * 6), new Duration(1000 * 3));
        reduceByKeyAndWindow.print();

        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();


    }
}
